接收客户端消息

websocket服务,接收客户端的消息种类如下:

1
2
3
4
5
6
//WebsocketServer#init()
messageDispatcher.register(Command.HANDSHAKE, () -> new HandshakeHandler(mPushServer));
messageDispatcher.register(Command.BIND, () -> new BindUserHandler(mPushServer));
messageDispatcher.register(Command.UNBIND, () -> new BindUserHandler(mPushServer));
messageDispatcher.register(Command.PUSH, PushHandlerFactory::create);
messageDispatcher.register(Command.ACK, () -> new AckHandler(mPushServer));

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
public final class WebsocketServer extends NettyTCPServer {

private final ChannelHandler channelHandler;

private final MessageDispatcher messageDispatcher;

private final ConnectionManager connectionManager;

private final MPushServer mPushServer;

public WebsocketServer(MPushServer mPushServer) {
super(CC.mp.net.ws_server_port);
this.mPushServer = mPushServer;
this.messageDispatcher = new MessageDispatcher();
this.connectionManager = new ServerConnectionManager(false);
this.channelHandler = new WebSocketChannelHandler(connectionManager, messageDispatcher);
}

@Override
public void init() {
super.init();
connectionManager.init();
messageDispatcher.register(Command.HANDSHAKE, () -> new HandshakeHandler(mPushServer));
messageDispatcher.register(Command.BIND, () -> new BindUserHandler(mPushServer));
messageDispatcher.register(Command.UNBIND, () -> new BindUserHandler(mPushServer));
messageDispatcher.register(Command.PUSH, PushHandlerFactory::create);
messageDispatcher.register(Command.ACK, () -> new AckHandler(mPushServer));
}

@Override
public void stop(Listener listener) {
super.stop(listener);
connectionManager.destroy();
}

@Override
public EventLoopGroup getBossGroup() {
return mPushServer.getConnectionServer().getBossGroup();
}

@Override
public EventLoopGroup getWorkerGroup() {
return mPushServer.getConnectionServer().getWorkerGroup();
}

@Override
protected void initPipeline(ChannelPipeline pipeline) {
pipeline.addLast(new HttpServerCodec());
pipeline.addLast(new HttpObjectAggregator(65536));
pipeline.addLast(new WebSocketServerCompressionHandler());
pipeline.addLast(new WebSocketServerProtocolHandler(CC.mp.net.ws_path, null, true));
pipeline.addLast(new WebSocketIndexPageHandler());
pipeline.addLast(getChannelHandler());
}

@Override
protected void initOptions(ServerBootstrap b) {
super.initOptions(b);
b.option(ChannelOption.SO_BACKLOG, 1024);
b.childOption(ChannelOption.SO_SNDBUF, 32 * 1024);
b.childOption(ChannelOption.SO_RCVBUF, 32 * 1024);
}

@Override
public ChannelHandler getChannelHandler() {
return channelHandler;
}

public ConnectionManager getConnectionManager() {
return connectionManager;
}

public MessageDispatcher getMessageDispatcher() {
return messageDispatcher;
}
}

WebSocketChannelHandler.java 处理socket消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
@ChannelHandler.Sharable
public class WebSocketChannelHandler extends SimpleChannelInboundHandler<WebSocketFrame> {
private static final Logger LOGGER = LoggerFactory.getLogger(WebSocketChannelHandler.class);
private final ConnectionManager connectionManager;
private final PacketReceiver receiver;

public WebSocketChannelHandler(ConnectionManager connectionManager, PacketReceiver receiver) {
this.connectionManager = connectionManager;
this.receiver = receiver;
}
//接收消息
@Override
protected void channelRead0(ChannelHandlerContext ctx, WebSocketFrame frame) throws Exception {
if (frame instanceof TextWebSocketFrame) {
String text = ((TextWebSocketFrame) frame).text();
Connection connection = connectionManager.get(ctx.channel());
Packet packet = PacketDecoder.decodeFrame(text);
LOGGER.debug("channelRead conn={}, packet={}", ctx.channel(), connection.getSessionContext(), packet);
receiver.onReceive(packet, connection);
} else {
String message = "unsupported frame type: " + frame.getClass().getName();
throw new UnsupportedOperationException(message);
}
}

//连接异常
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
Connection connection = connectionManager.get(ctx.channel());
Logs.CONN.error("client caught ex, conn={}", connection);
LOGGER.error("caught an ex, channel={}, conn={}", ctx.channel(), connection, cause);
ctx.close();
}

//建立连接
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
Logs.CONN.info("client connected conn={}", ctx.channel());
Connection connection = new NettyConnection();
connection.init(ctx.channel(), false);
connectionManager.add(connection);
}

//断开连接
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
Connection connection = connectionManager.removeAndClose(ctx.channel());
EventBus.post(new ConnectionCloseEvent(connection));
Logs.CONN.info("client disconnected conn={}", connection);
}
}
------ 本文结束 感谢您的阅读 ------